Skip to content

Conversation

@MasterPtato
Copy link
Contributor

No description provided.

@vercel
Copy link

vercel bot commented Nov 18, 2025

The latest updates on your projects. Learn more about Vercel for GitHub.

Project Deployment Preview Comments Updated (UTC)
rivetkit-serverless Error Error Nov 21, 2025 7:57pm
3 Skipped Deployments
Project Deployment Preview Comments Updated (UTC)
rivet-cloud Ignored Ignored Nov 21, 2025 7:57pm
rivet-inspector Ignored Ignored Preview Nov 21, 2025 7:57pm
rivet-site Ignored Ignored Preview Nov 21, 2025 7:57pm

Copy link
Contributor Author

MasterPtato commented Nov 18, 2025

Warning

This pull request is not mergeable via GitHub because a downstack PR is open. Once all requirements are satisfied, merge this PR as a stack on Graphite.
Learn more


How to use the Graphite Merge Queue

Add the label merge-queue to this PR to add it to the merge queue.

You must have a Graphite account in order to use the merge queue. Sign up using this link.

An organization admin has enabled the Graphite Merge Queue in this repository.

Please do not merge from GitHub as this will restart CI on PRs being processed by the merge queue.

This stack of pull requests is managed by Graphite. Learn more about stacking.

@claude
Copy link

claude bot commented Nov 18, 2025

PR Review: Fix batch listen, fix history for graceful signal send in workflows

Summary

This PR introduces two main improvements to the gasoline workflow engine:

  1. Graceful signal sending - adds graceful_not_found() method to signal builders that returns Ok(None) instead of erroring when the target workflow is not found
  2. Batch listen fix - corrects signal batching to properly sort by timestamp and handle indexed input chunks

Code Quality & Best Practices

Strengths:

  • Consistent API design across both common/signal.rs and workflow/signal.rs builders
  • Good documentation with doc comments explaining behavior (lines 107-108, 118-119 in both files)
  • Follows existing error handling patterns using Result<Option<Id>>
  • Proper tracing/logging throughout with structured logging as per CLAUDE.md guidelines
  • Loop optimization improvements maintain good performance characteristics

Suggestions:

  1. Type annotation consistency (gasoline/src/db/kv/mod.rs:1945):

    let mut signals = futures_util::stream::iter(owned_filter.clone())

    The mut is added here, but signals were already being collected into a Vec. This is correct now for the sort operation on line 1980, but consider adding a comment explaining why this needs to be mutable.

  2. Error message clarity (api-peer/src/actors/delete.rs:74-77):
    The warning message could be more specific about which operation failed:

    tracing::warn!(
        actor_id=?path.actor_id,
        "actor workflow not found when sending destroy signal, likely already stopped"
    );

Potential Bugs & Issues

Critical:

  1. Database index insertion logic (gasoline/src/db/kv/debug.rs:931-939 and gasoline/src/db/kv/mod.rs:1505-1512):

    if let Some(input_chunks) = current_event.indexed_input_chunks.get_mut(key.index) {
        input_chunks.push(entry);
    } else {
        current_event.indexed_input_chunks.insert(key.index, vec![entry]);
    }

    This is a good fix! The previous code had a logic error where it would only push to existing indices but never create new ones. This could have caused data loss in signal batching. The fix properly handles both cases.

  2. Timestamp consistency (gasoline/src/db/kv/mod.rs:1980-2012):
    Good catch fixing the timestamp! The code now:

    • Sorts signals by key.ts (line 1980)
    • Uses a single now timestamp for all acks (line 1982, 2011)
    • Applies limit AFTER sorting but BEFORE processing (line 1995)

    This ensures consistent ordering and prevents race conditions.

Minor:

  1. Event type mismatch fix (gasoline/src/db/kv/keys/history.rs:1645):
    EventType::Signal -> EventType::Signals
    Good catch! This was incorrectly using the singular form for batch operations.

Performance Considerations

Improvements:

  1. Loop event upsertion parallelization (gasoline/src/ctx/workflow.rs:872-978):
    The new approach defers loop event upserts to run in parallel with the next iteration. This is a clever optimization that should reduce latency in tight loops. The 4-way join on lines 910-940 is well-documented with comments explaining the parallel execution strategy.

  2. Signal batching optimization (gasoline/src/db/kv/mod.rs:1995):
    Applying .take(limit) after sorting but before processing is more efficient than the previous approach of sorting after collection.

Concerns:

  1. The buffer_unordered(1024) on line 2063 could cause memory pressure if processing 1024 signals simultaneously. Consider if this limit should be configurable or tied to the input limit parameter.

Security Concerns

No security issues identified. The changes:

  • Properly handle missing workflows without exposing sensitive information
  • Maintain transactional integrity with conflict ranges
  • Don't introduce new attack surfaces

Test Coverage

Missing test scenarios:

  1. Test that graceful_not_found() properly creates and replays removed events in workflow history
  2. Test signal batching with timestamps in non-chronological order to verify sorting
  3. Test the indexed input chunks fix with multiple signals in a batch
  4. Test loop event upsertion parallelization with rapid iterations

Recommended test additions:

// Test graceful signal send with replay
#[test]
async fn test_graceful_signal_not_found_replay() {
    // First run: target doesn't exist, should create removed event
    // Second run: should replay removed event correctly
}

// Test batch signal ordering
#[test]
async fn test_batch_signal_timestamp_ordering() {
    // Create signals with out-of-order timestamps
    // Verify they're received in timestamp order
}

Additional Notes

  1. Display text changes (engine/src/util/wf/mod.rs): The changes from "signal receive" to "signal recv" for consistency are good, but ensure this doesn't break any external tooling that parses these strings.

  2. Logging level change (universaldb/src/driver/rocksdb/transaction_conflict_tracker.rs:67): Changing from info! to debug! for conflict logging is appropriate as this could be noisy in production.

  3. TypeScript comment (runner-protocol/src/index.ts:56-58): The TODO comment about renaming createTs to update_ts should be addressed in a future PR to maintain naming consistency per CLAUDE.md conventions.

Verdict

LGTM with minor suggestions

This is a solid improvement to the workflow engine. The graceful signal sending feature is well-implemented with proper history tracking for replay consistency. The batch listen fixes address real bugs in signal ordering and indexed chunk handling. The loop optimization should provide measurable performance improvements.

The code follows project conventions, has good documentation, and maintains backward compatibility. Recommended to merge after addressing the test coverage gaps.


Recommendation: Approve with suggestion to add tests before merge.

@pkg-pr-new
Copy link

pkg-pr-new bot commented Nov 18, 2025

More templates

@rivetkit/cloudflare-workers

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/cloudflare-workers@3479

@rivetkit/db

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/db@3479

@rivetkit/framework-base

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/framework-base@3479

@rivetkit/next-js

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/next-js@3479

@rivetkit/react

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/react@3479

rivetkit

pnpm add https://pkg.pr.new/rivet-dev/rivet/rivetkit@3479

@rivetkit/sql-loader

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/sql-loader@3479

@rivetkit/engine-runner

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/engine-runner@3479

@rivetkit/engine-runner-protocol

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/engine-runner-protocol@3479

commit: 321b2c3

@MasterPtato MasterPtato force-pushed the 11-17-feat_gas_add_batch_listen branch from 25ece3d to 6ff29f6 Compare November 18, 2025 23:50
@MasterPtato MasterPtato force-pushed the 11-17-fix_gas_fix_batch_listen_fix_history_for_graceful_signal_send_in_workflows branch from 0ca2fa2 to 321b2c3 Compare November 18, 2025 23:50
@claude
Copy link

claude bot commented Nov 18, 2025

Critical Issues - Must Fix Before Merge

  1. Missing Variable Assignments - In several files, the code calls .await? but then tries to use res which was never assigned. This will cause compilation errors.

Affected locations:

  • engine/packages/api-peer/src/actors/delete.rs:67-73
  • engine/packages/guard/src/routing/pegboard_gateway.rs:195-207
  • engine/packages/pegboard-serverless/src/lib.rs:483-507
  • engine/packages/pegboard/src/workflows/runner.rs (4 occurrences)

Example of what needs fixing - change from this pattern:

.graceful_not_found()
.send()
.await?;
if res.is_none() {

To this pattern:

let res = ctx.signal(...)
    .graceful_not_found()
    .send()
    .await?;
if res.is_none() {
  1. Batch Listen Limit Mismatch - In db/kv/mod.rs:1995, the limit is applied AFTER the history event is inserted but BEFORE acknowledgment. This could cause signals beyond the limit to be lost or re-delivered incorrectly.

Positive Changes:

  • Better API ergonomics with graceful_not_found builder pattern
  • Proper history handling via removed events for replay determinism
  • Batch signal sorting by timestamp for correct ordering
  • Loop event upsertion properly parallelized

Recommendations:

  • Fix missing res variable assignments (critical)
  • Verify batch listen limit application (critical)
  • Add tests for graceful signal sending and replay
  • Add more context to debug logs
  • Verify EventType::Signals change backward compatibility

The core concept is solid - adding graceful degradation for missing signal targets is the right approach. However, compilation errors need fixing before merge.

@claude
Copy link

claude bot commented Nov 18, 2025

Code Review: Batch Listen & Graceful Signal Send

This PR introduces important improvements to the workflow signal system, but there are several critical issues that need to be addressed before merging.


Critical Issue: Missing runner2 Module

Severity: Critical - Breaks Compilation

Multiple files reference workflows::runner2 but the module does not exist:

  • engine/packages/pegboard/src/lib.rs:17 - Tries to register runner2::Workflow
  • engine/packages/pegboard/src/workflows/actor/mod.rs:6 - Imports runner2::AllocatePendingActorsInput
  • engine/packages/pegboard-runner/src/ping_task.rs:24 - References runner2::Input
  • engine/packages/pegboard-serverless/src/lib.rs:483+ - References runner2::Stop and runner2::Workflow

However, engine/packages/pegboard/src/workflows/mod.rs only declares pub mod actor and pub mod runner.

Fix Required: Either add pub mod runner2 and create the module file, OR revert all references back to runner if runner2 was intended for a different PR.

@claude
Copy link

claude bot commented Nov 18, 2025

Code Quality Issues

1. Potential Logic Issue in Batch Listen Sort/Limit

Severity: Medium

In engine/packages/gasoline/src/db/kv/mod.rs:1945-2067, the limit is applied twice:

  1. First, when fetching signals from each signal type (up to limit per type)
  2. Second, after sorting all signals by timestamp (taking only limit total)

If there are multiple signal types in the filter, this could fetch more signals than needed. For example, with 3 signal types and limit=10, you might fetch up to 30 signals, then sort and take 10.

Recommendation: Consider using limit: None in the initial fetch, or adjust the per-type limit to account for multiple signal types.

2. Inconsistent Event Type Naming

Severity: Low

The signal batch receive event is called different things across the codebase:

  • In db/debug.rs:125 - signal receive
  • In db/kv/keys/history.rs:1645 - EventType::Signals
  • In engine util/wf/mod.rs - signal recv

Consider standardizing terminology.

@claude
Copy link

claude bot commented Nov 18, 2025

Good Practices Observed

1. Excellent Error Handling Pattern

The graceful_not_found() API is well-designed and much cleaner than the previous pattern of checking for specific error types. This is a clean builder API that improves code readability significantly!

2. Proper History Tracking for Graceful Not Found

The implementation correctly handles replay scenarios by:

  • Checking is_removed() during replay (engine/packages/gasoline/src/builder/workflow/signal.rs:117-126)
  • Creating a removed event when the target is not found (lines 176-190)
  • Comparing removed events during replay to ensure consistency

This maintains workflow determinism properly.

3. Loop Optimization Parallelization

The loop optimization in engine/packages/gasoline/src/ctx/workflow.rs:866-979 is well-documented with detailed comments explaining the parallelization strategy, which is excellent for maintainability.

@claude
Copy link

claude bot commented Nov 18, 2025

Test Coverage

Missing Tests for New Features

Severity: Medium

The PR adds significant new functionality but I do not see corresponding tests:

  1. No test for graceful_not_found() - Should test:

    • Signal to non-existent workflow returns Ok(None)
    • Replay of gracefully-not-found signal works correctly
    • Without graceful_not_found(), signal to non-existent workflow still returns error
  2. No test for batch listen - Should test:

    • Multiple signals received in correct timestamp order
    • Limit is applied correctly
    • Multiple signal types are handled properly

Recommendation: Add integration tests in engine/packages/gasoline/tests/workflows/ following the pattern of signal_test.rs.

@claude
Copy link

claude bot commented Nov 18, 2025

Summary

Must Fix Before Merge:

  1. Resolve the missing runner2 module issue (critical compilation error)

Should Fix:
2. Add test coverage for graceful_not_found and batch listen
3. Review the batch listen limit logic (potential over-fetching)
4. Standardize signal event naming across the codebase

Nice to Have:
5. Review the log level change in universaldb transaction_conflict_tracker.rs:67 (changed from info to debug)


Overall, this is a solid improvement to the signal system with good attention to replay consistency and error handling. The main blocker is the runner2 compilation issue. Once that is resolved and tests are added, this should be ready to merge.

Great work on the graceful error handling pattern!

@MasterPtato MasterPtato force-pushed the 11-17-feat_gas_add_batch_listen branch from 6ff29f6 to dba469c Compare November 19, 2025 22:06
@MasterPtato MasterPtato force-pushed the 11-17-fix_gas_fix_batch_listen_fix_history_for_graceful_signal_send_in_workflows branch from 321b2c3 to aa9ef2a Compare November 19, 2025 22:06
@claude
Copy link

claude bot commented Nov 19, 2025

Pull Request Review

This PR introduces important fixes for batch signal listening and adds graceful handling for signal sends when the target workflow doesn't exist. Here's my analysis:

Strengths

  1. Graceful Not Found Handling: The new graceful_not_found() API is well-designed and consistent across both workflow and operation signal builders. This is a clean solution for handling cases where workflows may have already been stopped.

  2. Batch Listen Fix: The sorting of signals by timestamp (signals.sort_by_key(|key| key.ts)) at db/kv/mod.rs:1980 ensures signals are processed in the correct order. This is critical for maintaining event ordering guarantees.

  3. Loop Optimization: The parallel execution improvement in ctx/workflow.rs:910-940 is excellent. Deferring the loop event upsert to run in parallel with the next iteration (loop_event_upsert_fut) reduces latency without sacrificing correctness.

  4. Proper Replay Handling: The signal builder correctly handles replay scenarios by checking for removed events at builder/workflow/signal.rs:117-126, ensuring deterministic workflow execution.

  5. Consistent Error Handling: Good use of the graceful_not_found pattern across multiple call sites (api-peer, guard, pegboard-serverless, pegboard runner workflow).

🔍 Issues & Concerns

Critical: Index Bug in Batch Listen

At db/kv/mod.rs:2001 and db/kv/debug.rs:935, there's a concerning pattern:

let packed_key = self.subspace.pack(&key);

In the main code path, you pack with self.subspace, but earlier you iterate with take(limit). However, at line 1995, you have:

.into_iter().take(limit).enumerate()

This take(limit) happens after sorting but you're also applying limit earlier when fetching from each stream. This could lead to incorrect behavior if signals from multiple streams need to be interleaved by timestamp.

Issue: You fetch up to limit signals from each signal name stream, sort them all, then take the first limit. If you have 3 signal types and limit=10, you could fetch 30 signals, sort, then take 10. This might be intentional but seems wasteful. Consider documenting this behavior or optimizing to use a min-heap approach for fetching from multiple streams.

Minor: Duplicate Index Insertion Logic

The code at db/kv/debug.rs:932-938 and db/kv/mod.rs:1502-1509 has duplicate logic for handling missing indices in indexed_input_chunks:

if let Some(input_chunks) = current_event.indexed_input_chunks.get_mut(key.index) {
    input_chunks.push(entry);
} else {
    current_event.indexed_input_chunks.insert(key.index, vec\![entry]);
}

This is correct but could be simplified to entry(key.index).or_insert_with(Vec::new).push(entry) for better readability.

Minor: EventType Consistency

At db/kv/keys/history.rs:1645, the event type was changed from EventType::Signal to EventType::Signals. Verify this is intentional and that all consumers handle both variants correctly for backwards compatibility during rollout.

🎯 Suggestions

  1. Documentation: Add doc comments explaining the behavior when graceful_not_found returns None vs throwing an error. The current doc comment is good but could clarify that callers should check for None and handle accordingly.

  2. Test Coverage: I don't see new tests for the graceful_not_found functionality. Consider adding tests that verify:

    • Signal send returns None when target workflow doesn't exist
    • Replay correctly handles removed signal events
    • Batch listen maintains correct ordering with multiple signal types
  3. Logging Consistency: At universaldb/transaction_conflict_tracker.rs:67, you changed tracing::info\! to tracing::debug\!. This is good for reducing log noise, but verify this doesn't impact any monitoring/alerting that depends on these logs.

  4. Runner2 Migration: At pegboard-serverless/lib.rs:483-511, you're trying runner2 first, then falling back to old runner workflow. Document this migration strategy and consider when the fallback can be removed.

📋 Nitpicks

  • Line db/kv/mod.rs:1945: let mut signals = ... could be let signals = ... since you reassign it as mutable later when sorting
  • The pnpm-lock.yaml changes suggest dependency version bumps - verify these are intentional and don't introduce breaking changes
  • AsyncAPI schema changes are quite large (488 additions, 435 deletions) - ensure these are properly versioned

Overall Assessment

This is a solid PR that addresses real issues in signal handling and workflow loop performance. The graceful error handling pattern is a valuable addition to the API. The batch listen fix is important for correctness.

Recommendation: ✅ Approve with minor suggestions

The critical concern about batch listen limit handling should be verified/documented, but the code appears functionally correct. Consider adding tests for the new graceful handling behavior before merging.

@claude
Copy link

claude bot commented Nov 19, 2025

Pull Request Review

Thanks for this PR @MasterPtato! I've reviewed the changes for fixing batch listen and history for graceful signal sending in workflows. Overall, the implementation looks solid with good attention to detail. Here are my findings:

🎯 Strengths

1. Graceful Signal Handling

  • The new graceful_not_found() API is well-designed and consistent across both common/signal.rs and workflow/signal.rs
  • Properly handles replay scenarios by checking for removed events (lines 116-126 in workflow/signal.rs)
  • Correctly creates removed events when signal targets aren't found (lines 176-189)
  • Return type change from Result<Id> to Result<Option<Id>> is appropriate and clearly documented

2. Batch Listen Fix

  • Correctly sorts signals by timestamp BEFORE taking the limit (line 1980 in db/kv/mod.rs)
  • Properly applies the limit only once after sorting (line 1995)
  • Fixed the key packing issue by using self.subspace.pack(&key) instead of tx.pack(&key) (line 2001)
  • Good parallelization of signal acknowledgment operations

3. Loop Event Optimization

  • Excellent optimization by deferring loop upsertion to run in parallel with the next iteration (lines 966-978 in ctx/workflow.rs)
  • The 4-stage tokio::join! is complex but well-documented with clear comments explaining the execution flow (lines 901-909)
  • Properly handles the deferred futures in sequential order

4. Error Handling Cleanup

  • Consistent pattern for replacing verbose error matching with cleaner graceful_not_found() + is_none() checks
  • All call sites updated consistently across api-peer, guard, pegboard, and pegboard-serverless

🔍 Issues & Concerns

1. Index Handling in Debug Code ⚠️
In gasoline/src/db/kv/debug.rs (lines 932-939) and gasoline/src/db/kv/mod.rs (lines 1505-1511):

if let Some(input_chunks) = current_event.indexed_input_chunks.get_mut(key.index) {
    input_chunks.push(entry);
} else {
    current_event.indexed_input_chunks.insert(key.index, vec![entry]);
}

Issue: The get_mut() followed by insert() pattern suggests the code is handling out-of-order indices, but there's an ensure! check just above (line 926-929) that should prevent this. This might indicate:

  • The ensure check isn't working as expected
  • Or the fallback insert() is defensive code that shouldn't be needed

Recommendation: Either remove the defensive insert() if the ensure guarantees order, or remove the ensure if out-of-order is expected. The current code is contradictory.

2. Missing Test Coverage ⚠️
I searched for tests related to batch listen and graceful signal handling but didn't find any specific test cases for:

  • Batch listen with multiple signals ensuring correct timestamp ordering
  • Graceful signal sending when target workflow doesn't exist
  • Replay of gracefully failed signals

Recommendation: Add test coverage for these critical scenarios, especially:

// Test that batch listen returns signals in timestamp order
// Test that graceful_not_found returns None without error
// Test that replaying a gracefully failed signal works correctly

3. History Event Type Inconsistency ℹ️
In db/kv/keys/history.rs line 1645, changed from EventType::Signal to EventType::Signals. While this appears intentional for batch operations, ensure this doesn't break any existing history queries or filters that expect EventType::Signal.

4. Potential Race Condition ⚠️
In db/kv/mod.rs (lines 1944-1976), signals are fetched from multiple streams in parallel and then sorted. Between the fetch and sort:

  • New signals could arrive
  • The limit is applied AFTER collecting all streams

Question: Should the limit be applied per-stream or globally? Current implementation applies it globally after merging, which seems correct but could result in fetching more than limit signals total from the database before filtering.

🎨 Code Quality

Positive:

  • Follows CLAUDE.md conventions (lowercase log messages, structured logging with ?x)
  • Proper use of workspace dependencies
  • Good use of tracing instrumentation
  • Comments explain complex async operations

Minor:

  • Log message style is inconsistent: "signal receive" vs "signal recv" (line 626 in engine/src/util/wf/mod.rs)

🔒 Security

No security concerns identified. The changes don't introduce:

  • SQL injection risks
  • Command injection
  • Unauthorized access patterns
  • Information disclosure

⚡ Performance

Positive:

  • Excellent optimization of loop event commits using deferred futures
  • Batch signal acknowledgment runs in parallel
  • Proper use of tokio::join! for parallelization

Note: The sorting of signals (line 1980) could be expensive with many signals, but this is necessary for correctness. Consider adding metrics to monitor the number of signals being sorted.

📊 Summary

Category Rating Notes
Code Quality 8/10 Well-structured, follows conventions
Test Coverage 5/10 Missing tests for new features
Performance 9/10 Excellent optimizations
Security 10/10 No concerns
Documentation 7/10 Code comments good, API docs could be better

✅ Recommendations

  1. High Priority: Clarify the index handling logic in debug.rs (remove contradictory code)
  2. High Priority: Add test coverage for batch listen ordering and graceful signal handling
  3. Medium Priority: Add integration tests that verify the complete signal lifecycle with graceful failures
  4. Low Priority: Consider adding metrics for batch listen operations (count, latency, signals per batch)
  5. Low Priority: Standardize log message format ("signal recv" everywhere)

🚀 Approval

Despite the missing test coverage, the core implementation is sound and the changes are necessary fixes. I recommend:

  • ✅ Approving this PR for merge
  • 📝 Creating a follow-up issue to add comprehensive test coverage
  • 🔍 Monitoring the index handling in debug code after deployment

Great work on the complex async optimizations! The deferred loop upsertion is particularly clever.

@MasterPtato MasterPtato force-pushed the 11-17-fix_gas_fix_batch_listen_fix_history_for_graceful_signal_send_in_workflows branch from aa9ef2a to dfd167c Compare November 20, 2025 03:07
@MasterPtato MasterPtato force-pushed the 11-17-feat_gas_add_batch_listen branch from dba469c to 2a53bcf Compare November 20, 2025 03:07
@claude
Copy link

claude bot commented Nov 20, 2025

Pull Request Review

Summary

This PR introduces two significant improvements to the Gasoline workflow engine:

  1. Graceful signal send handling - Allows signals to gracefully handle cases where the target workflow doesn't exist
  2. Batch signal listening fix - Corrects the event type from Signal to Signals and fixes signal ordering/chunking issues
  3. Loop optimization - Improves performance by deferring database updates in workflow loops

Code Quality & Best Practices

✅ Strengths

  1. Consistent API Design: The graceful_not_found() builder method follows the existing builder pattern consistently across both common/signal.rs and workflow/signal.rs.

  2. Proper Event Replay Handling: The workflow signal builder correctly handles replay scenarios by checking for removed events before attempting to send (lines 116-126 in builder/workflow/signal.rs).

  3. Good Documentation: The return type change is well-documented with clear comments explaining when None vs Some(Id) will be returned.

  4. Performance Optimization: The loop optimization (lines 910-979 in ctx/workflow.rs) cleverly defers database upserts to run in parallel with the next iteration, reducing serial database operations.

⚠️ Issues & Concerns

1. Potential Race Condition in Signal Sorting (Medium Priority)

Location: db/kv/mod.rs:1945-1980

The batch listen implementation now sorts signals by timestamp after fetching them from multiple streams. However, there's a potential issue:

let mut signals = futures_util::stream::iter(owned_filter.clone())
    .map(|signal_name| { /* fetch from each stream */ })
    .flatten()
    .try_collect::<Vec<_>>()
    .await?;

// Sort by ts AFTER collection
signals.sort_by_key(|key| key.ts);

Concern: The limit is applied per stream during the fetch (line 1957), but sorting happens afterwards. This means:

  • If you request limit=10 and have 3 signal types, you could fetch up to 30 signals
  • After sorting, you only take the first 10 (line 1995)
  • This could lead to unexpected signal ordering if signals arrived in a different order than their timestamps

Recommendation: Consider either:

  • Applying the limit after sorting (which you do on line 1995, so this may be intentional)
  • Or clarifying the behavior in comments since this changes semantics

Actually, on closer inspection: You do take(limit) on line 1995 after sorting, so this appears correct. However, the per-stream limit on line 1957 might still cause issues if one stream has many old signals and another has newer ones.

2. Missing Index Initialization Bug Fix (High Priority - Good catch!)

Location: db/kv/debug.rs:932-938 and db/kv/mod.rs:1505-1512

} else {
    current_event
        .indexed_input_chunks
        .insert(key.index, vec![entry]);
}

This fixes a bug where indexed_input_chunks might not have an entry for a given index. Great defensive programming! However:

Question: Is this a bug fix that should be mentioned in the commit message? It seems like it could cause crashes if not present.

3. Return Type Breaking Change (High Priority)

Location: All signal send callsites

The send() method now returns Result<Option<Id>> instead of Result<Id>. While all callsites in this PR have been updated correctly, this is a breaking API change.

Current callsite pattern:

let res = ctx.signal(Destroy {})
    .graceful_not_found()
    .send()
    .await?;
if res.is_none() {
    tracing::warn!("workflow not found");
}

Recommendations:

  • ✅ All callsites in this PR correctly handle the Option return
  • ⚠️ Consider if there are callsites outside this PR that need updating
  • ⚠️ This might warrant a minor version bump or migration guide

4. Removed Event Handling - Edge Case

Location: builder/workflow/signal.rs:116-126

if self.graceful_not_found && self.ctx.cursor().is_removed() {
    self.ctx.cursor().compare_removed::<RemovedSignal<T>>()?;
    // ...
    return Ok(None);
}

Observation: This check happens before the version comparison. Is this intentional?

Potential issue: If a workflow version changes and a signal that was previously gracefully not found is now sent successfully, the replay might incorrectly skip sending the signal.

Recommendation: Verify the order of checks is correct, or add a comment explaining why removed check comes before version check.

5. Logging Level Change

Location: universaldb/src/driver/rocksdb/transaction_conflict_tracker.rs:67

-tracing::info!(
+tracing::debug!(

Question: Was this causing log spam in production? This seems like a good change but might be worth mentioning in the commit message.

Performance Considerations

✅ Excellent Optimizations

  1. Parallel Loop Execution (ctx/workflow.rs:910-979): The refactoring to run loop initialization, upsert, branch commit, and user code in parallel via tokio::join! is excellent. This should significantly reduce latency for tight loops.

  2. Deferred Upserts (ctx/workflow.rs:966-978): Only upserting every LOOP_ITERS_PER_COMMIT iterations and deferring the future to run in parallel with the next iteration is a clever optimization.

⚠️ Potential Performance Issues

  1. Signal Sorting on Every Batch (db/kv/mod.rs:1980): Sorting signals by timestamp on every batch listen could be expensive if many signals are pending. Consider:

    • Is the timestamp ordering guaranteed by the database key structure?
    • If so, could we avoid the sort?
    • If not, is in-memory sort acceptable for large batches?
  2. Renamed Variables: The renaming of loop_event_commit_fut to loop_event_init_fut and introduction of loop_event_upsert_fut is clear, but the join now has 4 futures instead of 3. Verify this doesn't hurt performance (though it should be fine since they run concurrently).

Security Concerns

✅ No Major Security Issues

The changes don't introduce obvious security vulnerabilities. The graceful handling of missing workflows is actually more secure than potentially exposing workflow existence through error messages.

Test Coverage

❌ Missing Tests

Concern: This PR adds no test files. Given the complexity of the changes, especially:

  • Graceful signal send behavior
  • Signal replay with removed events
  • Loop optimization edge cases
  • Batch signal ordering

Recommendation: Add tests covering:

  1. Signal send with graceful_not_found() when workflow doesn't exist
  2. Signal send replay when a removed event exists
  3. Batch signal listen with multiple signal types arriving out of order
  4. Loop optimization with various LOOP_ITERS_PER_COMMIT scenarios

Additional Observations

Minor Issues

  1. Display Text Changes: The changes to event display text ("signal receive""signal", "signals""signal recv") in engine/src/util/wf/mod.rs seem cosmetic. Are these intentional UX improvements?

  2. Runner Workflow Migration: pegboard-serverless/src/lib.rs shows a backward compatibility attempt to try runner2 first, then fall back to runner. Is there a migration plan to remove the old runner workflow?

  3. Indentation Changes: The signal display indentation changes in engine/src/util/wf/mod.rs:468-492 look good but are cosmetic.

Summary Recommendations

Required Before Merge

  1. ✅ Verify all signal send callsites handle Option<Id> return (appears complete in this PR)
  2. ⚠️ Add test coverage for new functionality
  3. ⚠️ Clarify if the indexed_input_chunks bug fix should be called out in commit message

Nice to Have

  1. Add comments explaining signal sorting behavior in batch listen
  2. Add comment explaining why removed event check comes before version check
  3. Consider migration plan for old runner workflow removal
  4. Update documentation if this is a breaking API change

Overall Assessment

This is a well-implemented PR that adds valuable functionality and performance improvements. The code quality is good, follows existing patterns, and the graceful signal handling is implemented consistently across both contexts (common and workflow).

Main concerns: Test coverage and the breaking API change should be addressed before merge.

Rating: ⭐⭐⭐⭐ (4/5) - High quality with minor concerns

@claude
Copy link

claude bot commented Nov 20, 2025

Code Review

Summary

This PR fixes two important issues in the Gasoline workflow engine:

  1. Batch listen fix: Signals are now sorted by timestamp and properly limited before processing
  2. Graceful signal send: Adds graceful_not_found() method to handle missing workflow targets without errors, with proper history tracking for replay

Positive Observations

Strong Design Decisions

  • Consistent API: The graceful_not_found() pattern is consistently applied across both workflow and common signal builders
  • Proper replay support: The implementation correctly handles replaying graceful_not_found signals by checking for and comparing removed events (lines 116-126 in builder/workflow/signal.rs)
  • Clear return type: Changing the return type from Result<Id> to Result<Option<Id>> provides clear semantics about what happened
  • Parallelization optimization: Loop optimization in ctx/workflow.rs properly defers database operations for parallel execution

Good Code Quality

  • Error handling: Proper early returns when graceful_not_found is set and workflow isn't found
  • Documentation: Clear doc comments explaining the behavior (lines 95, 106-107)
  • Logging: Appropriate debug logging at key decision points
  • Backward compatibility: The pegboard-serverless migration handles both old and new runner workflows gracefully

Issues & Concerns

1. Critical: Potential Race Condition in Batch Listen

Location: db/kv/mod.rs:1945-1980

The batch listen implementation sorts signals AFTER fetching them from multiple streams. Each individual stream fetch has limit: Some(limit) (line 1957), but signals are sorted AFTER fetching. If you have Stream A with signals at timestamps [100, 200, 300] and Stream B with signals at [50, 150, 250] with limit 2, this will fetch [100, 200] from A and [50, 150] from B, then sort to [50, 100, 150, 200], then take 2 to get [50, 100]. But the signal from stream B at timestamp 150 was excluded even though it's earlier than 200.

Recommendation: Remove the per-stream limit and only apply limit after sorting, OR fetch more than needed from each stream.

2. Bug: Missing Removal Event in Common Signal Builder

Location: builder/common/signal.rs:152-161

The common (non-workflow) signal builder returns Ok(None) when gracefully handling not found, but it doesn't create a removed event for history tracking. Compare with the workflow version (builder/workflow/signal.rs:175-185) which properly commits a removed event.

Impact: If this code path is used in any replay scenarios, the replay will fail due to missing history events.

Recommendation: Add the removed event commit in the common builder as well, or document why it's not needed.

3. Inconsistency: Signal ID Generation

Location: builder/common/signal.rs:126

In the common builder, signal_id is generated even when graceful_not_found is true and might return early. This wastes ID generation when the signal won't be sent. The workflow builder generates the ID only in the "Send signal" branch (line 146).

Recommendation: Move ID generation to after the workflow lookup check.

4. Test Coverage Concern

No new tests were added for the graceful_not_found() functionality. This is a complex feature involving workflow not found scenarios, replay with removed events, and both workflow and common signal builders.

Recommendation: Add tests covering graceful signal send to non-existent workflow, replay of gracefully not found signal, and graceful signal in loops.

5. Minor: Duplicate Code in Batch Listen

Location: db/kv/debug.rs:935-938 and db/kv/mod.rs:2005-2008

The same logic for handling indexed_input_chunks appears twice, suggesting the initialization might be missing elsewhere.

Performance Considerations

Positive

  • Loop optimization properly parallelizes database operations
  • Batch listen now limits signals before processing them

Concerns

  • The batch listen still processes all signal streams in parallel which could be resource-intensive with many signal types

Security Review

No security issues identified. The changes don't introduce SQL injection risks, authentication/authorization bypass, data leakage, or command injection.

Documentation

  • Good inline documentation for public APIs
  • Missing documentation about when to use graceful_not_found vs regular signal send
  • The batch listen sorting behavior should be documented

Recommendations Summary

Must Fix:

  1. Fix the race condition in batch listen by applying limit only after sorting all signals
  2. Add removed event to common signal builder or document why it's not needed

Should Fix:
3. Add test coverage for graceful_not_found() functionality
4. Move signal ID generation in common builder to after workflow lookup

Nice to Have:
5. Add higher-level documentation about the graceful signal pattern
6. Review the duplicate indexed_input_chunks insertion code

Verdict

This PR makes important improvements to reliability (graceful signal handling) and correctness (batch listen ordering). However, the batch listen race condition is critical and should be addressed before merging. The missing test coverage is also concerning for such a significant behavior change.

Recommendation: Request changes to fix the batch listen limit issue and add test coverage.

@MasterPtato MasterPtato force-pushed the 11-17-fix_gas_fix_batch_listen_fix_history_for_graceful_signal_send_in_workflows branch from dfd167c to bcf4d5d Compare November 21, 2025 19:57
@MasterPtato MasterPtato force-pushed the 11-17-feat_gas_add_batch_listen branch from 2a53bcf to 633c857 Compare November 21, 2025 19:57
@claude
Copy link

claude bot commented Nov 21, 2025

PR Review: fix(gas): fix batch listen, fix history for graceful signal send in workflows

Summary

This PR introduces several fixes to the gasoline workflow engine:

  1. Adds a graceful_not_found() builder method for signals that returns Ok(None) instead of erroring when the target workflow doesn't exist
  2. Fixes batch signal listening by sorting signals by timestamp before processing
  3. Fixes history event type for batch signals (EventType::SignalEventType::Signals)
  4. Updates various callers to use the new graceful signal API
  5. Minor display/debug output improvements

Code Quality ✅

Good patterns observed:

  • The graceful_not_found() builder pattern is well-implemented and follows existing patterns in the codebase
  • Proper handling of replay scenarios for graceful signals by storing removed events in history (engine/packages/gasoline/src/builder/workflow/signal.rs:116-126)
  • The return type change from Result<Id> to Result<Option<Id>> is a cleaner API than catching and filtering errors

Suggestions:

  • The new is_removed() helper in cursor.rs:478-485 is clean, but consider adding a brief doc comment explaining its purpose in signal replay scenarios

Potential Bugs / Issues ⚠️

  1. Signal sorting location (engine/packages/gasoline/src/db/kv/mod.rs:1976-1977):
    The signals are now sorted by key.ts before inserting the history event. This is correct. However, the take(limit) is now applied after sorting, which changes behavior - previously signals were fetched with limit, then sorted. Now they're fetched with limit per signal type, merged, sorted, then limited again. This seems intentional but should be verified.

  2. Missing graceful_not_found for to_workflow_id path (engine/packages/gasoline/src/builder/common/signal.rs:167-173):
    The graceful_not_found flag is only checked for the to_workflow_name path. When sending via to_workflow_id, if the workflow doesn't exist, the publish_signal call may still fail. Is this intentional? The workflow signal builder has similar logic.

  3. Debug output inconsistency (engine/packages/gasoline/src/db/debug.rs:125):
    Changed to "signal receive" but in engine/packages/engine/src/util/wf/mod.rs:626 it's "signal recv". Consider using consistent naming.


Performance Considerations ✅

  1. Loop event parallelization improvement (engine/packages/gasoline/src/ctx/workflow.rs:869-960):
    The deferred loop_event_upsert_fut optimization is a good change - it allows the upsert to run in parallel with the next iteration. This reduces latency for long-running loops.

  2. Signal batch sorting (engine/packages/gasoline/src/db/kv/mod.rs:1976-1977):
    Sorting happens after fetching all signals but before processing. This is O(n log n) but n is bounded by the limit parameter, so this is acceptable.


Security Considerations ✅

No security concerns identified. The changes are internal to workflow engine logic and don't expose new attack surfaces.


Test Coverage 🔶

No new tests appear to be added. Consider adding:

  • Unit tests for graceful_not_found() behavior in both common and workflow signal builders
  • Test for replay of a graceful signal that previously had no target
  • Test for batch signal ordering

Minor Issues

  1. Trailing comma consistency: Some debug logs use , after the last parameter, others don't. Not a big deal but worth noting.

  2. Log level for conflict tracker (engine/packages/universaldb/src/driver/rocksdb/transaction_conflict_tracker.rs:67):
    Changed from info to debug - good change, this reduces noise in production logs.

  3. pnpm-lock.yaml changes: These appear to be dependency resolution changes, not directly related to the PR. Verify these are intentional.


Overall Assessment

This is a solid fix that improves the ergonomics of signal handling when target workflows may not exist. The code follows existing patterns and the implementation is clean. The batch listen fix with timestamp ordering is important for correctness.

Recommendation: Approve with minor suggestions above.

🤖 Generated with Claude Code

@claude
Copy link

claude bot commented Nov 21, 2025

PR Review: fix(gas): fix batch listen, fix history for graceful signal send in workflows

Summary

This PR introduces several improvements to the gasoline workflow engine:

  1. New graceful_not_found() API for signal sending - allows callers to gracefully handle cases where a signal target workflow doesn't exist
  2. Batch listen fix - fixes signal ordering and event type in batch listen operations
  3. Loop optimization - defers loop state upsertion for better parallelization
  4. Debugging improvements - fixes print formatting and consistency in history display
  5. Various cleanup - removes verbose error-handling boilerplate across multiple call sites

✅ Positive Changes

1. Clean API Design
The graceful_not_found() builder method is well-designed and follows the existing builder pattern consistently in both builder/common/signal.rs and builder/workflow/signal.rs.

2. Proper History Tracking
The workflow signal builder correctly inserts a "removed" event when the target workflow is not found (lines 175-188 in builder/workflow/signal.rs), ensuring workflow replay works correctly.

3. Improved Code Readability
The refactoring from:

if let Some(WorkflowError::WorkflowNotFound) = res
    .as_ref()
    .err()
    .and_then(|x| x.chain().find_map(|x| x.downcast_ref::<WorkflowError>()))
{ ... }

to:

if res.is_none() { ... }

is much cleaner and easier to understand.

4. Bug Fix: Batch Listen Signal Ordering
Good catch on sorting signals by timestamp before processing them (signals.sort_by_key(|key| key.ts) at line 1977 in db/kv/mod.rs). This ensures deterministic ordering.

5. Bug Fix: Event Type Correction
Fixing EventType::Signal to EventType::Signals in keys/history.rs:1645 is important for correct history tracking of batch signal events.

6. Performance: Deferred Loop Upsertion
The new loop_event_upsert_fut pattern in ctx/workflow.rs that defers loop state writes to the next iteration for parallelization is a nice optimization.


⚠️ Potential Concerns

1. Breaking API Change
The return type of send() changed from Result<Id> to Result<Option<Id>>. This is a breaking change that will require all callers to be updated. While the PR updates all call sites in this repo, downstream consumers may be affected.

Suggestion: Ensure this is called out in release notes if there are external consumers of this API.

2. Missing graceful_not_found handling in to_workflow_id path

In builder/common/signal.rs lines 167-173, when sending to a workflow by ID directly (not by tags), there's no graceful_not_found handling:

(None, Some(workflow_id), true) => {
    tracing::debug!(to_workflow_id=%workflow_id, "dispatching signal via workflow id");

    self.db
        .publish_signal(self.ray_id, workflow_id, signal_id, T::NAME, &input_val)
        .await?;
}

If publish_signal fails because the workflow doesn't exist, this won't gracefully return None. This could be intentional (ID implies certainty of existence), but worth verifying the behavior is expected.

3. Fallback Logic in drain_runner

The fallback from runner2::Workflow to runner::Workflow in pegboard-serverless/src/lib.rs:492-509 suggests a migration is in progress. Consider:

  • Adding a TODO comment explaining when the fallback can be removed
  • Adding metrics/logging to track how often the fallback is hit

4. Log Level Demotion

In universaldb/src/driver/rocksdb/transaction_conflict_tracker.rs, changing tracing::info! to tracing::debug! for conflict range logging may hide useful debugging information in production. Ensure this is intentional and that conflicts can still be diagnosed when needed.


🔍 Minor Suggestions

  1. Inconsistent naming in debug output (db/debug.rs):

    • Line 125 outputs "signal receive" while engine/src/util/wf/mod.rs:626 outputs "signal recv". Consider being consistent.
  2. Doc comment for is_removed() (history/cursor.rs:478-485):

    • The new helper function lacks a doc comment. Consider adding one for consistency with compare_removed().
  3. Debug insertion fix in db/kv/debug.rs:

    • Lines 935-937 add a missing else branch to insert into indexed_input_chunks. This looks like an important bug fix that should be tested.

📋 Test Coverage

I don't see any new tests added in this PR. Given the changes include:

  • New API surface (graceful_not_found)
  • Bug fixes in batch listen ordering
  • History event type corrections
  • Replay logic changes

Consider adding tests for:

  • graceful_not_found returning None when target doesn't exist
  • Replay of a signal that previously had no target (removed event)
  • Batch listen signal ordering

Verdict

The changes are generally well-implemented and address real issues. The API design is clean and follows existing patterns. The main concerns are:

  1. Ensure the breaking API change is documented
  2. Consider test coverage for the new graceful handling path
  3. Clarify when the runner/runner2 fallback can be removed

Overall: Looks good with minor suggestions 👍

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants